package com.nx.platform.es.service.impl;

import com.google.common.base.Preconditions;
import com.google.common.collect.HashMultiset;
import com.google.common.collect.Multiset;
import com.google.common.util.concurrent.AbstractIdleService;
import com.nx.platform.es.common.utils.MoreSplitters;
import com.nx.platform.es.common.utils.YamlParser;
import com.nx.platform.es.service.ESClientManager;
import com.nx.platform.es.system.config.ConfigCenter;
import com.nx.platform.es.common.utils.Constants;
import com.nx.platform.es.common.utils.MoreMaps;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.http.HttpHost;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestHighLevelClient;
import org.jetbrains.annotations.NotNull;

import java.util.*;
import java.util.concurrent.ThreadLocalRandom;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.Stream;

/**
 * ES 客户端管理实现
 * <p>
 * Created by  on 2017/4/14.
 */
@Slf4j
public class ESClientManagerImpl extends AbstractIdleService implements ESClientManager {

    private static final Pattern HOST_PATTERN = Pattern.compile("^([^:]+)(?::(\\d++))$");

    private static final String CLUSTERS = "clusters";
    private static final String CLUSTER_CODE = "cluster.code";
    private static final String CLUSTER_HOSTS = "cluster.hosts";
    private static final String REQUEST_WEIGHT = "request.weight";

    public static final Map<RestHighLevelClient, String> client2Host = new HashMap<>();


    @Override
    public ESClient getResource(Set<String> codes, String target) {
        // all clients
        Multiset<ESClient> allClients = getAllClients();
        Preconditions.checkState(allClients != null && !allClients.isEmpty(),
                "elastic clients null or empty, mabye have not initialled");
        // available clients
        Multiset<ESClient> clients = allClients;
        if (CollectionUtils.isNotEmpty(codes)) {
            clients = HashMultiset.create();
            for (Multiset.Entry<ESClient> entry : allClients.entrySet()) {
                if (codes.contains(entry.getElement().getCode())) {
                    clients.add(entry.getElement(), entry.getCount());
                }
            }
        }
        if (clients.isEmpty()) {
            log.error("clients required{} not found, use other clients instead", codes);
            clients = allClients;
        }
        // have no choice
        if (clients.elementSet().size() == 1) {
            return clients.elementSet().iterator().next();
        }
        // target code
        if (target != null) {
            for (Multiset.Entry<ESClient> entry : clients.entrySet()) {
                if (entry.getElement().getCode().equalsIgnoreCase(target)) {
                    return entry.getElement();
                }
            }
        }
        // random
        int random = ThreadLocalRandom.current().nextInt(clients.size());
        for (Multiset.Entry<ESClient> entry : clients.entrySet()) {
            if (random < entry.getCount())
                return entry.getElement();
            random -= entry.getCount();
        }
        // Never happens
        throw new IllegalStateException("have not random any elastic client");
    }

    @Override
    protected void startUp() throws Exception {
        Preconditions.checkState(getAllClients() != null);
    }

    @Override
    protected void shutDown() throws Exception {
        Multiset<ESClient> clients = getAllClients();
        if (clients == null) {
            return;
        }
        for (ESClient client : clients.elementSet()) {
            client.getClient().close();
        }
    }

    @SuppressWarnings("unchecked")
    private Multiset<ESClient> getAllClients() {
        return (Multiset<ESClient>) ConfigCenter
                .getConfig(Constants.ES_CLIENT, this::parse, this::clean).orElse(null);
    }


    private Multiset<ESClient> parse(@NotNull String newValue) throws Exception {
        Map<?, ?> settings = YamlParser.parseToMap(newValue);
        // 解析并链接所有配置的集群
        Multiset<ESClient> newClients = HashMultiset.create();
        List<Map<?, ?>> clusters = MoreMaps.getObject(settings, CLUSTERS);
        boolean allHealth = clusters.stream().allMatch(cluster -> {
            // settings
            String clusterCode = MoreMaps.getString(cluster, CLUSTER_CODE, "default");
            String[] nodes = MoreMaps.getStringArray(cluster, CLUSTER_HOSTS, MoreSplitters.COMMA);
            int weight = MoreMaps.getIntValue(cluster, REQUEST_WEIGHT, 1);
            // check
            if (nodes == null || nodes.length <= 0 || weight <= 0) {
                return true;
            }
            HttpHost[] httpHosts = Stream.of(nodes).map(node -> {
                Matcher matcher = HOST_PATTERN.matcher(node);
                if (matcher.find()) {
                    return new HttpHost(matcher.group(1).trim(), Integer.parseInt(matcher.group(2)));
                } else {
                    return null;
                }
            }).filter(Objects::nonNull).toArray(HttpHost[]::new);
            String hosts = Stream.of(httpHosts).map(HttpHost::toHostString).collect(Collectors.joining(","));
            // init cient
            RestHighLevelClient client = null;
            try {
                client = new RestHighLevelClient(RestClient.builder(httpHosts));
                newClients.add(new ESClient(clusterCode, hosts, client), weight);
                // check connection
                log.info("Connecting to cluster({}) ...", hosts);
                client.ping();
                log.info("Connected to cluster({})", hosts);
                return true;
            } catch (Exception e) {
                String clientStr = client == null ? "null" : hosts;
                log.error("Connect to cluster({}) error.", clientStr, e);
                return false;
            }
        });
        if (!allHealth || newClients.isEmpty()) {
            // 任一集群链接不上，放弃配置变更
            for (ESClient client : newClients.elementSet()) {
                client.getClient().close();
            }
            throw new IllegalStateException("not all clients health or clients is empty");
        }
        return newClients;
    }


    @SuppressWarnings("unchecked")
    private void clean(@NotNull Object oldResult) throws Exception {
        Multiset<ESClient> oldClients = (Multiset<ESClient>) oldResult;
        for (ESClient client : oldClients.elementSet()) {
            client.getClient().close();
        }
    }

}
